在抓取美股1小時數據進行多資產交易模型訓練時,發現不同股票的數據時間戳和數據量往往不一致。這種情況主要由以下幾個原因導致:
為了在訓練模型時使用一致且對齊的數據,我們需要對數據進行對齊處理。以下是解決這個問題的最新方案。
pip install pandas pandas-market-calendars
pandas: 用於數據處理的基本庫。pandas-market-calendars: 用於提供市場交易日曆,方便生成交易時間索引。考慮到市場有夏令和冬令時間,所以使用pandas-market-calendars來取得美股指定時間內的美日盤內交易時間。
import os
import pandas as pd
import pandas_market_calendars as mcal
from PolygonIO.PolygonIODownloader import PolygonIODownloader  # 请确保导入正确
def nyse_get_market_hours(start_date, end_date, freq='1h', tz='UTC'):
    nyse = mcal.get_calendar('NYSE')
    schedule = nyse.schedule(start_date=start_date, end_date=end_date)
    # 生成市场交易时间索引,考虑夏令时和冬令时
    market_times = []
    for idx, row in schedule.iterrows():
        day_open = row['market_open']
        day_close = row['market_close']
        times = pd.date_range(
            start=day_open,
            end=day_close,
            freq=freq,
            tz=tz,
            inclusive='left'  # 使用 'inclusive' 参数
        )
        market_times.extend(times)
    market_times = pd.DatetimeIndex(market_times)
    return market_times
def nyse_filter_and_align_to_market_hours(df,
                    start_date_str,
                    end_date_str,
                    freq='1h',
                    ticker_key='ticker',
                    date_key='timestamp'):
    # 获取市场交易时间索引
    market_times = nyse_get_market_hours(start_date_str,
                                         end_date_str,
                                         freq=freq)
    '''
        美國正確的交易時間是9:30-16:00
        所以這裡抓到的market_times是像下面這樣的資料
        DatetimeIndex([
            '2019-09-23 13:30:00+00:00', '2019-09-23 14:30:00+00:00',
            '2019-09-23 15:30:00+00:00', '2019-09-23 16:30:00+00:00',
            '2019-09-23 17:30:00+00:00', '2019-09-23 18:30:00+00:00',
            '2019-09-23 19:30:00+00:00', ...],
            dtype='datetime64[ns, UTC]', length=8779, freq=None)>
        
        但是從polygon.io抓到的'1h'資料, 是align到整點的
            2019-09-23 08:00:00+00:00
            2019-09-23 09:00:00+00:00
            2019-09-23 10:00:00+00:00
            2019-09-23 11:00:00+00:00
            
        因此這裡要把market_times的時間也align到整點
    '''
    market_times = market_times.floor('h')
    # 获取所有股票代码
    tickers = df[ticker_key].unique()
    # 创建多重索引:时间戳和股票代码
    index = pd.MultiIndex.from_product([market_times, tickers],
                                       names=[date_key, ticker_key])
    # 设置数据的索引为 ['date', ticker_key]
    df = df.set_index([date_key, ticker_key])
    # 重新索引数据,缺失值将被填充为 NaN
    df_aligned = df.reindex(index)
    # 重置索引,恢复为普通列
    df_aligned = df_aligned.reset_index(drop=False)
    return df_aligned
對於對齊後的資料可能會出現一些NaN,首先從過往數據中找出最近的一筆數據來填充;若無過去數據,則只好使用第一筆有效數據來填充。
def sort_and_fill_null(df, ticker_key='ticker', date_key='timestamp'):
    # 处理缺失值,先向后填充,再前向填充
    df = df.groupby(ticker_key).apply(lambda group: group.bfill().ffill())
    df = df.reset_index(drop=True)
    # 重置索引,确保 'ticker' 仅作为列存在
    df = df.sort_values([date_key, ticker_key]).reset_index(drop=True)
    return df
def get_sp500_tickers() -> list:
    """
    從維基百科抓取S&P 500的股票代碼列表。
    """
    try:
        url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
        tables = pd.read_html(url)
        df = tables[0]
        tickers = df['Symbol'].tolist()
        # 處理一些特殊字符,例如替換'.'為'-',因為Polygon.io可能使用不同的格式
        # tickers = [ticker.replace('.', '-') for ticker in tickers]
        return tickers
    except Exception as e:
        print(f"Error fetching S&P 500 tickers: {e}")
        return []
def get_dji30_tickers() -> list:
    """
    返回DJI 30的股票代碼列表。
    """
    return [
        'AAPL', 'AMGN', 'AXP', 'BA', 'CAT', 'CRM', 'CSCO', 'CVX', 'DIS', 'DOW',
        'GS', 'HD', 'HON', 'IBM', 'INTC', 'JNJ', 'JPM', 'KO', 'MCD', 'MMM',
        'MRK', 'MSFT', 'NKE', 'PG', 'TRV', 'UNH', 'V', 'VZ', 'WBA', 'WMT'
    ]
def extract_features_for_ticker(df_ticker):
    from finrl.meta.preprocessor.preprocessors import FeatureEngineer
    # 对于每个 ticker,先计算特征
    fe = FeatureEngineer(use_technical_indicator=True,
                         tech_indicator_list=["macd", "rsi", "cci", "adx"])
    
    df_features = fe.preprocess_data(df_ticker)
    return df_features
def main():
    import random
    from tqdm import tqdm
    # 初始化 PolygonIODownloader
    polygon_wrapper = PolygonIODownloader()
    # 设置股票代码和日期范围
    # ticker_list = get_sp500_tickers() + get_dji30_tickers()
    ticker_list = get_dji30_tickers()
    ticker_list = list(set(ticker_list))  # 去重複
    start_date_str = '2019-09-21'
    end_date_str = '2024-09-21'
    # 1. 抓取所有股票的1小时OHLCV数据
    df_1h_ohlcv = polygon_wrapper.fetch_ohlcv(ticker_list,
                                              start_date_str,
                                              end_date_str,
                                              timespan='hour')
    
    # 重命名列,以符合FinRL处理的需求
    df_1h_ohlcv = df_1h_ohlcv.rename(columns={
        'timestamp': 'date',
        'ticker': 'tic'
    })
    # 2. 先過濾到交易時間並對齊
    df_1h_ohlcv = nyse_filter_and_align_to_market_hours(df_1h_ohlcv,
                                                        start_date_str,
                                                        end_date_str,
                                                        freq='1h',
                                                        ticker_key='tic',
                                                        date_key='date')
    # 3. 初次填補NaN(价格bfill、ffill,Volume填0)
    df_1h_ohlcv = sort_and_fill_null(df_1h_ohlcv,
                                     ticker_key='tic',
                                     date_key='date')
    # 4. 对每个ticker进行特徵擷取
    df_features_all = []
    for ticker in tqdm(ticker_list, desc="Processing tickers"):
        df_ticker = df_1h_ohlcv[df_1h_ohlcv['tic'] == ticker]
        df_features = extract_features_for_ticker(df_ticker)
        df_features_all.append(df_features)
    # 合并所有ticker的特徵数据
    df_features_all = pd.concat(df_features_all)
    # 5. 计算特徵后的填补NaN(如果特徵仍有NaN,再次填补)
    df = df_features_all.fillna(0)
    # 照日期排序
    df = df.sort_values(['date', 'tic']).reset_index(drop=True)
    # 6. 按时间分割数据集
    split_date = '2024-01-01'
    df_train = df[df['date'] < split_date]  # 2024年以前的数据作为训练数据
    df_trade = df[df['date'] >= split_date]  # 2024年及以后的数据作为交易数据
    # 保存为 CSV 文件
    df_train.to_csv(f'sp500_1hour_{start_date_str}_{split_date}_train.csv', index=False)
    df_trade.to_csv(f'sp500_1hour_{split_date}_{end_date_str}_trade.csv', index=False)
    # 打印部分输出
    print("Train Data Sample:")
    print(df_train.head())
    print("\nTrade Data Sample:")
    print(df_trade.head())
if __name__ == "__main__":
    main()
通過以上步驟,我將美股不同股票的一小時數據在時間上進行對齊,確保在同一時間點上擁有所有股票的數據。這對於訓練多資產的量化交易模型非常重要。最新的解決方案還包含了自動處理數據的特徵擷取和填充缺失值步驟,這樣可以在數據準備過程中更好地支持後續的模型訓練和測試。
跑完後將會生成兩個檔案
f'sp500_1hour_{start_date_str}_{split_date}_train.csv': 訓練資料f'sp500_1hour_{split_date}_{end_date_str}_trade.csv': 驗證資料接下來會使用一開始stock trading的程式碼來訓練1小時的數據,然後做Backtest看看效果。
說實話,在這波處理完資料後,我開始考慮之後使用加密貨幣來做測試,因為美股的開盤時間其實很短,感覺一小時數據量也沒有比日線資料豐富多少,交易機會也沒有多很多;這樣的情況下,似乎可以嘗試看看加密貨幣,但加密貨幣的缺點是數據太少,波動過於劇烈,可能難度會更高;總之,現在我仍然會照計畫繼續使用美股一小時數據來嘗試做自動交易,加密貨幣留待日後嘗試。